---
title: Streaming
description: Learn how to use the LlamaIndex workflow with streaming.
---

`Workflow` API by default is designed for streaming data. In this guide, we will show you how to use the `Workflow` API with streaming data.

Each `workflow.run` call returns `WorkflowContext`, which implements `AsyncIterable` interface. You can use it to stream data.

```ts twoslash
import { Workflow, WorkflowEvent, StartEvent, StopEvent } from '@llamaindex/workflow';
class ComputeEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}
class ComputeResultEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}

type ContextData = {
	sum: number;
}

const workflow = new Workflow<ContextData, number, number>();
workflow.addStep({
	inputs: [StartEvent<number>],
	outputs: [StopEvent<number>]
}, async (context, startEvent) => {
	const total = startEvent.data;
	for (let i = 0; i < total; i++) {
		context.sendEvent(new ComputeEvent(i));
	}
	const computeResults = await Promise.all(Array.from({ length: total }).map(() => context.requireEvent(ComputeResultEvent)));
	// Workflow API allows you to start events in parallel and wait for all of them to finish
	context.data.sum = computeResults.reduce((acc, curr) => acc + curr.data, 0);
	return new StopEvent(context.data.sum);
});
```

We define a parallel computation workflow that computes the sum of numbers from 0 to `total`.

The workflow sends `ComputeEvent` events for each number and waits for `ComputeResultEvent` events. After receiving all `ComputeResultEvent` events, the workflow returns the sum as a `StopEvent`.

What if we want cutoff if the sum exceeds a certain value?

## Streaming

```ts twoslash
import { Workflow, WorkflowEvent, StartEvent, StopEvent } from '@llamaindex/workflow';
import { StopCircle } from 'lucide-react';
class ComputeEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}
class ComputeResultEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}


type ContextData = {
	sum: number;
}

const workflow = new Workflow<ContextData, number, number>();
// ---cut---
const context = workflow.run(1000, {
	sum: 0
});

for await (const event of context) {
	if (event instanceof ComputeEvent) {
		if (context.data.sum > 100) {
			throw new Error('Sum exceeds 100');
		}
	}
	if (event instanceof StopEvent) {
		console.log('result', event.data);
	}
}
```

You can define more custom logic using `AsyncIterable` interface.

For example. I just want to stop the workflow if I get a `ComputeResultEvent`


```ts twoslash
import { Workflow, WorkflowEvent, StartEvent, StopEvent } from '@llamaindex/workflow';
import { StopCircle } from 'lucide-react';
class ComputeEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}
class ComputeResultEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}


type ContextData = {
	sum: number;
}

const workflow = new Workflow<ContextData, number, number>();
// ---cut---
async function compute() {
	const context = workflow.run(1000, {
		sum: 0
	});
	for await (const event of context) {
		if (event instanceof ComputeResultEvent) {
			return event.data;
		}
	}
	throw new Error('UNREACHABLE');
}

const result = await compute();
```

### Streaming with UI

You can use the `Workflow` API with UI libraries like React.

```tsx twoslash
// @filename: utils.ts
export async function runWithoutBlocking(fn: () => Promise<void>) {
	fn();
}
// @filename: action.ts
// ---cut---
'use server';
// "use server" is required to enable server side feature in React
import { createStreamableUI } from 'ai/rsc';
import { runWithoutBlocking } from './utils';
// ---cut-start---
import { Workflow, WorkflowEvent, StartEvent, StopEvent } from '@llamaindex/workflow';
class ComputeEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}
class ComputeResultEvent extends WorkflowEvent<number> {
	constructor(data: number) {
		super(data);
	}
}


type ContextData = {
	sum: number;
}

const workflow = new Workflow<ContextData, number, number>();
const min = 100;
const max = 1000;
workflow.addStep(
	{
		inputs: [ComputeEvent],
		outputs: [ComputeResultEvent]
	},
	async (context, event) => {
		await new Promise((resolve) =>
			setTimeout(resolve, Math.floor(Math.random() * (max - min + 1) + min))
		);
		return new ComputeResultEvent(event.data);
	}
);
// ---cut-end---
export async function compute() {
	'use server';
	const ui = createStreamableUI();
	const context = workflow.run(100, {
		sum: 0
	});
	runWithoutBlocking(async () => {
		for await (const event of context) {
			if (event instanceof ComputeResultEvent) {
				// Update UI
			} else if (event instanceof StopEvent) {
				// Update UI
			}
			// ...
		}
	});
	return ui.value;
}
```

<WorkflowStreamingDemo />